# 操作符

欢迎回来！在[数据流](./dataflow)中，我们学习了 `dora` 应用程序的整体蓝图以及它如何连接各个部分。在[节点](./node)中，我们深入探讨了这些节点部分，并了解到每个节点通常作为一个独立的进程运行，执行特定的任务，并通过数据流 `YAML` 中定义的输入和输出进行通信。

现在，让我们探索一种在 `Node` 内部添加更多结构和可重用性的方法： **`Operator 操作符`** 的概念。

## 节点内部更小、可重复使用的步骤

假设你有一个负责处理图像的节点。该节点可能需要执行几个不同的步骤：

1. 将图像转换为灰度。
2. 将图像调整为特定尺寸。
3. 对图像应用过滤器。
4. 在过滤后的图像中寻找边缘。

你可以在一个节点的单一代码实现中编写所有这些逻辑。但是，如果你之后想在另一个节点或不同的数据流中重用“调整图像大小”或“应用滤镜”逻辑，你就必须复制粘贴代码，这并不理想。

这时， **`Operator`** 就派上用场了。`Operator` 是一种较小的模块化逻辑，设计用于在一种名为 `Dora Runtime Node` 的特殊节点内运行。`Operator` 允许你将单个节点的工作分解为可重用的独立步骤。

可以将节点想象成工厂车间的工作站（一栋独立的建筑），而操作符则是该工作站内的专用工具或机器。工作站（节点）是独立的单元，但其内部的工作由其工具集（操作符）完成。

## 操作符与自定义节点

让我们快速比较一下：

| 特征         | 自定义节点                                  | Dora 运行时节点和操作符                                        |
| :----------- | :------------------------------------------ | :------------------------------------------------------------- |
| **运行方式** | 独立节点                                    | 单进程                                                         |
| **包含**     | 一段自定义的代码                            | 一个或多个操作符                                               |
| **逻辑单元** | 整个节点代码                                | 单个操作符                                                     |
| **通讯**     | 使用 `dora Node API` 直接与 `dora` 系统交互 | 节点运行时使用 `dora Node API`; 操作符使用 `dora Operator API` |
| **可重用性** | 重用整个 `Node` 进程                        | 重复使用单个操作符逻辑模块                                     |

因此，虽然自定义节点是其自己独立的程序（ `.py` 、 `.rs` 可执行文件等），但 `Dora 运行时` 节点是一个运行 `dora-runtime` 程序的进程，然后加载和管理数据流 `YAML` 中指定的一个或多个操作员的代码。

## 在数据流 `YAML` 中定义操作符

运算符是在 `Dataflow YAML` 中节点的定义内定义的。您无需为节点的实现指定单个 `path` 或 `custom` 块，而是使用 `operators` 符块。

让我们想象一下之前的图像处理节点，现在使用操作符来实现。

```py title="dataflow.yml"
nodes:
  # ... other nodes like camera ...

  # Define our image processing Node that uses Operators
  - id: image-processor-node # This is the ID of the Node process
    inputs:
      image: camera/image # This Node takes image data from the camera Node
    outputs:
      - processed_image # This Node outputs the final processed image

    # This node is a Runtime Node and will load operators
    operators:
      # First Operator: Convert to Grayscale
      - id: grayscale-converter # ID of the operator within THIS node
        source: python # How to load this operator's code
        path: operators/grayscale_op.py # The code file for this operator
        inputs:
          input_image: image-processor-node/image # Takes input from the NODE's image input
        outputs:
          - gray_image # Outputs grayscale image

      # Second Operator: Resize
      - id: resizer # ID of the second operator
        source: python
        path: operators/resize_op.py
        inputs:
          input_image: grayscale-converter/gray_image # Takes input from the 'gray_image' output of 'grayscale-converter' OPERATOR
        outputs:
          - resized_image # Outputs resized image

      # Third Operator: Apply Filter
      - id: filter
        source: python
        path: operators/filter_op.py
        inputs:
          input_image: resizer/resized_image # Takes input from the 'resized_image' output of 'resizer' OPERATOR
        outputs:
          - filtered_image # Outputs filtered image

      # Fourth Operator: Edge Detector
      - id: edge-detector
        source: python
        path: operators/edge_op.py
        inputs:
          input_image: filter/filtered_image # Takes input from the 'filtered_image' output of 'filter' OPERATOR
        outputs:
          - processed_image # Outputs the final processed image

    # Notice: the 'processed_image' output of the 'edge-detector' operator
    # matches the 'processed_image' output of the 'image-processor-node'.
    # The runtime routes this automatically.
```

需要注意的关键事项：

- `nodes` 下的 `id` ( `image-processor-node` ) 是 `Node` 进程的 `ID`，和之前一样。
- 这里有一个 `operators` 列表，而不是 `path` 或 `custom` 。
- `operators` 列表中的每个项目定义了在此节点内运行的操作员 。
- 每个 `Operator` 都有自己的 `id` （ 在此节点内唯一）、 `source` （例如 `python` 、 `shared-library` ）和指向其代码的 `path` 。
- 操作员也有 `inputs` 和 `outputs` 。
- 连接语法仍然是 `source_id/stream_id` 。
- 节点自己的 `ID`（ `image-processor-node/image` ）：从外部数据流进入节点的数据被路由到操作员的输入。
- 同一节点内另一个 `Operator` 的 `ID`（ `grayscale-converter/gray_image` ）：数据在同一节点进程内的 `Operator` 之间直接流动。
- 如果 `Operator` 的输出 `id` 与 `Node` 的某个 `outputs` 匹配，则该输出将自动成为该 `Node` 到外部 `Dataflow` 的输出（ `edge-detector/processed_image` 变为 `image-processor-node/processed_image` ）。

这允许您在单个 `Node` 进程中定义一个迷你数据流。

## 操作员如何工作（操作员 API）

就像 `Node` 的代码使用 `dora Node API` 一样，`Operator` 的代码也使用 `dora Operator API` 。这个 `API` 更简单，因为 `Operator` 不需要担心网络连接或进程管理之类的事情；`Runtime Node` 会处理这些事情。

操作员的主要工作是对事件做出反应。最常见的事件是接收输入数据。

下面是我们的一个示例运算符（`grayscale-converter` 的 `Python` 代码的简化概念图，代码片段非常简短）：

```py title="operators/grayscale_op.py"
# operators/grayscale_op.py

from dora import DoraStatus

class Operator:
    """Converts incoming image to grayscale."""

    def on_event(
        self,
        dora_event, # The event received by the operator
        send_output, # Function to send outputs
    ) -> DoraStatus:
        
        if dora_event["type"] == "INPUT":
            # Check if the input is the one we expect
            if dora_event["id"] == "input_image":
                image_data = dora_event["value"] # Get the incoming image data
                
                print(f"Received image on input_image.")
                
                # --- Operator's specific logic ---
                # (Actual image processing code goes here)
                grayscale_image_data = convert_to_grayscale(image_data) 
                # ---------------------------------

                # Send the result to the operator's output
                send_output("gray_image", grayscale_image_data, dora_event["metadata"]) 

        return DoraStatus.CONTINUE # Keep running
```

在此 `Python` 代码片段中：

- `Operator` 代码的结构为 `Python` 类（或 `Rust` 中的函数）。
- 它有一个 `on_event` 方法（或函数），每当发生与该操作符有关的事件（例如接收输入或停止信号）时， `dora-runtime` 就会调用该方法。
- `dora_event` 字典包含有关事件的信息（其 `type` 、 `id` 、 `value` 等。我们将在下一章介绍**事件流** ）。
- `send_output` 函数（或方法）由运行时提供，供 `Operator` 在其定义的输出上发送数据。它接受输出 `id` 、 `data` 以及可选的 `metadata` 。

这个模式（ `on_event -> process -> send_output` ）是 `Operator` 逻辑的核心。

## 底层：运行时节点和操作符

当您使用 `dora run` 和定义了带有操作符的运行时节点的 `Dataflow YAML` 时，下面是发生情况的简化视图：

1. `dora CLI` 和 `Dora Daemon / Dora Coordinator` 读取数据流 `YAML`。
2. 他们将 `image-processor-node` 标识为一个节点。
3. 他们看到它有一个 `operators` 块，表明它是一个运行时节点。
4. 它们为该节点启动一个单独的进程 。该进程运行 `dora-runtime` 可执行文件。
5. `dora-runtime` 读取 `image-processor-node` 块中的 Operator 定义。
6. 它根据 `source` 和 `path` 加载 `grayscale-converter` 、 `resizer` 、 `filter` 和 `edge-detector` 的代码。
7. 它建立了沟通渠道：
    - 从外部数据流的 `camera/image` 到 `image-processor-node` 的 image 输入。
    - 内部，从运行时节点的 `image` 输入到 `grayscale-converter` 操作器的 input_image 输入。
    - 在内部，从 `grayscale-converter/gray_image` 输出到 `resizer/input_image` 输入。
    - 内部，从 `resizer/resized_image` 输出到 `filter/input_image` 输入。
    - 内部，从 `filter/filtered_image` 输出到 `edge-detector/input_image` 输入。
    - 从 `edge-detector/processed_image` 输出到外部 `Dataflow` 的 `image-processor-node/processed_image` 输出。
8. 然后， `dora-runtime` 进入事件循环，从外部 `Dataflow` 接收事件（例如新图片），并根据 YAML 中定义的内部连接，将它们分发给相关的 `Operator`。当一个 `Operator` 生成输出时，运行时会将其内部路由到另一个 `Operator`，或者将其作为 `Node` 输出发送出去。

**这是一个简化的序列图：**

![序列图](/operator01.png)

这显示了数据如何流入单个运行时节点进程，通过运行时在操作员之间内部传递，然后作为节点输出流出。

## 总结

在本章中，我们了解到 `Operator` 是运行在 `Dora Runtime` 节点进程中的模块化逻辑。它们在 `Dataflow YAML` 的 `Node` 块中定义，允许您在单个节点内构建复杂的处理流水线。`Operator` 使用 `Operator API` 接收输入并发送输出， `dora-runtime` 负责加载它们、在它们之间建立内部通信，以及在节点的外部输入/输出和 `Operator` 的输入/输出之间路由数据。这提供了一种创建可复用处理步骤的方法。

现在我们了解了节点和操作符，让我们看看数据在系统中移动的基本概念： **`Event Stream 事件流`** 。

